package cn.smileyan.demo;

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;


/**
 *
 * @author Smileyan
 */
public class GenericElasticsearchSinkFunction<T> implements ElasticsearchSinkFunction<T> {
    private final String indexName;

    public GenericElasticsearchSinkFunction(String indexName) {
        this.indexName = indexName;
    }

    @Override
    public void process(T element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
        // 将数据写入 Elasticsearch 的逻辑
        if (element != null) {
            JSONObject jsonMap = JSONObject.from(element);
            IndexRequest indexRequest = Requests.indexRequest()
                    .index(indexName)
                    .source(jsonMap);

            // 将 IndexRequest 添加到 RequestIndexer
            requestIndexer.add(indexRequest);
        }
    }
}
